---
title: Develop a new worker type
description: Learn how to create a Prefect worker to run your flows.
---

Prefect workers set up execution infrastructure and start flow runs.

<Tip>
**Advanced topic**

Extending the Prefect framework by developing a worker requires deep knowledge of 
Prefect. For standard needs, we recommend using one of the [available worker types](/v3/deploy/infrastructure-concepts/workers/#worker-types).
</Tip>

This guide walks you through creating a custom worker that can run your flows on your chosen infrastructure.

## Worker configuration

When setting up an execution environment for a flow run, a worker receives configuration for the infrastructure it is 
designed to work with. Examples of configuration values include memory allocation, CPU allocation, credentials, image name. 
The worker uses this configuration to create the execution environment and start the flow run.

<Tip>
**How are the configuration values populated?**

The work pool that a worker polls for flow runs has a [base job template](/v3/deploy/infrastructure-concepts/work-pools/#base-job-template) 
associated with it. 
The template is the contract for how configuration values populate for each flow run.

The keys in the `job_configuration` section of this base job template match the worker's configuration class attributes. 
The values in the `job_configuration` section of the base job template populate the attributes of the worker's 
configuration class.

The work pool creator gets to decide how they want to populate the values in the `job_configuration` section of the base 
job template. The values can be hard-coded, templated using placeholders, or a mix of these two approaches. Because you, 
as the worker developer, don't know how the work pool creator will populate the values, you should set sensible defaults 
for your configuration class attributes as a matter of best practice.
</Tip>

### Implementing a `BaseJobConfiguration` subclass

Defines a worker's configuration to function with a class that extends `BaseJobConfiguration`.

`BaseJobConfiguration` has attributes common to all workers:

| Attribute | Description |
| --------- | ----------- |
| `name` | The name to assign to the created execution environment. |
| `env` | Environment variables to set in the created execution environment. |
| `labels` | The labels assigned to the created execution environment for metadata purposes. |
| `command` | The command to use when starting a flow run. |

Prefect sets values for each attribute before giving the configuration to the worker. If you want to customize 
the values of these attributes, use the `prepare_for_flow_run` method.

Here's an example `prepare_for_flow_run` method that adds a label to the execution environment:

```python
def prepare_for_flow_run(
    self, flow_run, deployment = None, flow = None, work_pool = None, worker_name = None
):  
    super().prepare_for_flow_run(flow_run, deployment, flow, work_pool, worker_name)  
    self.labels.append("my-custom-label")
```

A worker configuration class is a [Pydantic model](https://docs.pydantic.dev/usage/models/), so you can add additional 
attributes to your configuration class as Pydantic fields. For example, if you want to allow memory and CPU requests for your worker, 
you can do so like this:

```python
from pydantic import Field
from prefect.workers.base import BaseJobConfiguration

class MyWorkerConfiguration(BaseJobConfiguration):
    memory: int = Field(
            default=1024,
            description="Memory allocation for the execution environment."
        )
    cpu: int = Field(
            default=500, 
            description="CPU allocation for the execution environment."
        )
```

This configuration class populates the `job_configuration` section of the resulting base job template.

For this example, the base job template would look like this:

```yaml
job_configuration:
    name: "{{ name }}"
    env: "{{ env }}"
    labels: "{{ labels }}"
    command: "{{ command }}"
    memory: "{{ memory }}"
    cpu: "{{ cpu }}"
variables:
    type: object
    properties:
        name:
          title: Name
          description: Name given to infrastructure created by a worker.
          type: string
        env:
          title: Environment Variables
          description: Environment variables to set when starting a flow run.
          type: object
          additionalProperties:
            type: string
        labels:
          title: Labels
          description: Labels applied to infrastructure created by a worker.
          type: object
          additionalProperties:
            type: string
        command:
          title: Command
          description: The command to use when starting a flow run. In most cases,
            this should be left blank and the command will be automatically generated
            by the worker.
          type: string
        memory:
            title: Memory
            description: Memory allocation for the execution environment.
            type: integer
            default: 1024
        cpu:
            title: CPU
            description: CPU allocation for the execution environment.
            type: integer
            default: 500
```

This base job template defines what values deployment providers can provide on a per-deployment basis; and how those provided 
values translate into the configuration values that the worker uses to create the execution environment.

Each attribute for the class was added in the `job_configuration` section with placeholders whose name matches the 
attribute name. The `variables` section was populated with the OpenAPI schema for each attribute. If a configuration class 
does not explicitly declare any template variables, the template variables are inferred from the configuration class 
attributes.

### Customize configuration attribute templates

You can customize the template for each attribute for situations where the configuration values need more sophisticated 
templating. For example, if you want to add units for the `memory` attribute, you can do so like this:

```python
from pydantic import Field
from prefect.workers.base import BaseJobConfiguration

class MyWorkerConfiguration(BaseJobConfiguration):
    memory: str = Field(
        default="1024Mi",
        description="Memory allocation for the execution environment."
        json_schema_extra=dict(template="{{ memory_request }}Mi")
    )
    cpu: str = Field(
        default="500m", 
        description="CPU allocation for the execution environment."
        json_schema_extra=dict(template="{{ cpu_request }}m")
    )
```

You changed the type of each attribute to `str` to accommodate the units and added a new `template` attribute 
to each attribute. The `template` attribute populates the `job_configuration` section of the resulting base job template.

For this example, the `job_configuration` section of the resulting base job template would look like this:

```yaml
job_configuration:
    name: "{{ name }}"
    env: "{{ env }}"
    labels: "{{ labels }}"
    command: "{{ command }}"
    memory: "{{ memory_request }}Mi"
    cpu: "{{ cpu_request }}m"
```

To use custom templates, you need to declare the template variables used in the template. This is because the names of 
those variables can no longer be inferred from the configuration class attributes.

### Rules for template variable interpolation

When defining a job configuration model, it's useful to understand how template variables are interpolated into the job configuration. 
The templating engine follows a few simple rules:

1. If a template variable is the only value for a key in the `job_configuration` section, the key is replaced with the value 
template variable.
2. If a template variable is part of a string (meaning there is text before or after the template variable), the value of the template 
variable is interpolated into the string.
3. If a template variable is the only value for a key in the `job_configuration` section and no value is provided for the template 
variable, the key is removed from the `job_configuration` section.

These rules allow worker developers and work pool maintainers to define template variables that are complex types like dictionaries 
and lists. These rules also mean that worker developers should give reasonable default values to job configuration fields whenever 
possible. This is because values are not guaranteed to be provided if template variables are unset.

### Template variable usage strategies

Template variables define the interface that deployment creators interact with to configure the execution environments of their 
deployments. The complexity of this interface is controllable through the template variables that are defined for a base job template. 
This control allows work pool maintainers to find a point along the spectrum of flexibility and simplicity appropriate for their 
organization.

There are two patterns that are represented in current worker implementations:

#### Pass-through

In the pass-through pattern, template variables pass through to the job configuration with little change. This pattern exposes 
complete control to deployment creators but also requires them to understand the details of the execution environment.

This pattern is useful when the execution environment is simple, and the deployment creators are expected to have high technical 
knowledge.

The [Docker worker](https://reference.prefect.io/prefect_docker/worker/) is an example of a worker that uses this pattern.

#### Infrastructure as code templating

Depending on the infrastructure they interact with, workers can sometimes employ a declarative infrastructure syntax 
(infrastructure as code) to create execution environments (for example, a Kubernetes manifest or an ECS task definition).

In the IaC pattern, it's useful to use template variables to template portions of the declarative syntax which can 
generate the declarative syntax into a final form.

This approach allows work pool creators to provide a simpler interface to deployment creators while also controlling which portions 
of infrastructure are configurable by deployment creators.

The [Kubernetes worker](https://reference.prefect.io/prefect_kubernetes/worker/) is an example of a worker that uses this pattern.

### Configure credentials

When executing flow runs within cloud services, workers will often need credentials to authenticate with those services. 
For example, a worker that executes flow runs in AWS Fargate needs AWS credentials. As a worker developer, you can use 
blocks to accept credentials configuration from the user.

For example, to allow the user to configure AWS credentials, you can do so like this:

```python
from prefect_aws import AwsCredentials

class MyWorkerConfiguration(BaseJobConfiguration):
    aws_credentials: AwsCredentials = Field(
        default=None,
        description="AWS credentials to use when creating AWS resources."
    )
```

Users can create and assign a block to the `aws_credentials` attribute in the UI and the worker will use these credentials when 
interacting with AWS resources.

## Worker template variables

Providing template variables for a base job template defines the fields that deployment creators can override per deployment. 
The work pool creator ultimately defines the template variables for a base job template, but the worker developer can 
define default template variables for the worker to make it easier to use.

Default template variables for a worker are defined by implementing the `BaseVariables` class. Like the `BaseJobConfiguration` 
class, the `BaseVariables` class has attributes that are common to all workers:

| Attribute | Description |
| --------- | ----------- |
| `name` | The name to assign to the created execution environment. |
| `env` | Environment variables to set in the created execution environment. |
| `labels` | The labels assigned the created execution environment for metadata purposes. |
| `command` | The command to use when starting a flow run. |

You can add additional attributes to the `BaseVariables` class to define additional template variables. For example, 
if you want to allow memory and CPU requests for your worker, you can do so like this:

```python
from pydantic import Field
from prefect.workers.base import BaseVariables

class MyWorkerTemplateVariables(BaseVariables):
    memory_request: int = Field(
            default=1024,
            description="Memory allocation for the execution environment."
        )
    cpu_request: int = Field(
            default=500, 
            description="CPU allocation for the execution environment."
        )
```

When `MyWorkerTemplateVariables` is used in conjunction with `MyWorkerConfiguration`, 
the resulting base job template will look like this:

```yaml
job_configuration:
    name: "{{ name }}"
    env: "{{ env }}"
    labels: "{{ labels }}"
    command: "{{ command }}"
    memory: "{{ memory_request }}Mi"
    cpu: "{{ cpu_request }}m"
variables:
    type: object
    properties:
        name:
          title: Name
          description: Name given to infrastructure created by a worker.
          type: string
        env:
          title: Environment Variables
          description: Environment variables to set when starting a flow run.
          type: object
          additionalProperties:
            type: string
        labels:
          title: Labels
          description: Labels applied to infrastructure created by a worker.
          type: object
          additionalProperties:
            type: string
        command:
          title: Command
          description: The command to use when starting a flow run. In most cases,
            this should be left blank and the command will be automatically generated
            by the worker.
          type: string
        memory_request:
            title: Memory Request
            description: Memory allocation for the execution environment.
            type: integer
            default: 1024
        cpu_request:
            title: CPU Request
            description: CPU allocation for the execution environment.
            type: integer
            default: 500
```

Template variable classes are never used directly. They are used to generate a schema to 
populate the `variables` section of a base job template and validate the template variables provided by the user.

We don't recommend using template variable classes within your worker implementation for validation purposes because the 
work pool creator ultimately defines the template variables. The configuration class should handle any necessary run-time validation.

## Worker implementation

Workers set up execution environments using provided configuration. Workers also observe the execution environment as the 
flow run executes and report any crashes to the Prefect API.

### Attributes

To implement a worker, you must implement the `BaseWorker` class and provide it with the following attributes:

| Attribute | Description | Required |
| --------- | ----------- | -------- |
| `type` | The type of the worker. | Yes |
| `job_configuration` | The configuration class for the worker. | Yes |
| `job_configuration_variables` | The template variables class for the worker. | No |
| `_documentation_url` | Link to documentation for the worker. | No |
| `_logo_url` | Link to a logo for the worker. | No |
| `_description` | A description of the worker. | No |

### Methods

#### `run`

In addition to the attributes above, you must also implement a `run` method. The `run` method is called for each flow run 
the worker receives for execution from the work pool.

The `run` method has the following signature:

```python
 async def run(
    self,
    flow_run: FlowRun,
    configuration: BaseJobConfiguration,
    task_status: Optional[anyio.abc.TaskStatus] = None,
) -> BaseWorkerResult:
    ...
```

The `run` method is passed: the flow run to execute, the execution environment configuration for the flow run, and a task status 
object that allows the worker to track whether the flow run was submitted successfully.

The `run` method must also return a `BaseWorkerResult` object. The `BaseWorkerResult` object returned contains information about 
the flow run execution. For the most part, you can implement the `BaseWorkerResult` with no modifications like this:

```python
from prefect.workers.base import BaseWorkerResult

class MyWorkerResult(BaseWorkerResult):
    """Result returned by the MyWorker."""
```

To return more information about a flow run, add additional attributes to the `BaseWorkerResult` class.

### Worker implementation example

Below is an example of a worker implementation. This example is not intended as a complete implementation but to 
illustrate the previously mentioned concepts.

```python
from prefect.workers.base import BaseWorker, BaseWorkerResult, BaseJobConfiguration, BaseVariables

class MyWorkerConfiguration(BaseJobConfiguration):
    memory: str = Field(
        default="1024Mi",
        description="Memory allocation for the execution environment."
        json_schema_extra=dict(template="{{ memory_request }}Mi")
    )
    cpu: str = Field(
        default="500m", 
        description="CPU allocation for the execution environment."
        json_schema_extra=dict(template="{{ cpu_request }}m")
    )

class MyWorkerTemplateVariables(BaseVariables):
    memory_request: int = Field(
        default=1024,
        description="Memory allocation for the execution environment."
    )
    cpu_request: int = Field(
        default=500, 
        description="CPU allocation for the execution environment."
    )

class MyWorkerResult(BaseWorkerResult):
    """Result returned by the MyWorker."""

class MyWorker(BaseWorker):
    type: str = "my-worker"
    job_configuration = MyWorkerConfiguration
    job_configuration_variables = MyWorkerTemplateVariables
    _documentation_url = "https://example.com/docs"
    _logo_url = "https://example.com/logo"
    _description = "My worker description."

    async def run(
        self, flow_run: FlowRun, configuration: BaseJobConfiguration, task_status: Optional[anyio.abc.TaskStatus] = None,
    ) -> BaseWorkerResult:
        # Create the execution environment and start execution
        job = await self._create_and_start_job(configuration)

        if task_status:
            # Use a unique ID to mark the run as started. This ID is later used to tear down infrastructure
            # if the flow run is cancelled.
            task_status.started(job.id) 

        # Monitor the execution
        job_status = await self._watch_job(job, configuration)

        exit_code = job_status.exit_code if job_status else -1 # Get result of execution for reporting
        return MyWorkerResult(
            status_code=exit_code,
            identifier=job.id,
        )
```

Most of the execution logic is omitted from the example above, but it shows that the typical order of operations in the `run` method is:
    1. Create the execution environment and start the flow run execution
    2. Mark the flow run as started with the passed `task_status` object
    3. Monitor the execution
    4. Get the execution's final status from the infrastructure and return a `BaseWorkerResult` object

To see other examples of worker implementations, see the [`ProcessWorker`](https://reference.prefect.io/prefect/workers/process/) and 
[`KubernetesWorker`](https://reference.prefect.io/prefect_kubernetes/worker/) implementations.

### Integrate with the Prefect CLI

You can start workers through the Prefect CLI by providing the `--type` option to the `prefect worker start` CLI command. 
To make your worker type available through the CLI, it must be available at import time.

If your worker is in a package, you can add an entry point to your setup file in the following format:

```python
entry_points={
    "prefect.collections": [
        "my_package_name = my_worker_module",
    ]
},
```

Prefect discovers this entry point and loads your work module in the specified module. The entry point allows the 
worker to be available through the CLI.
